1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 import static rx.Observable.create;
35
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicLong;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import rx.Notification;
41 import rx.Observable;
42 import rx.Observable.OnSubscribe;
43 import rx.Observable.Operator;
44 import rx.Producer;
45 import rx.Scheduler;
46 import rx.Subscriber;
47 import rx.functions.Action0;
48 import rx.functions.Func1;
49 import rx.functions.Func2;
50 import rx.schedulers.Schedulers;
51 import rx.subjects.PublishSubject;
52 import rx.subscriptions.SerialSubscription;
53
54 public final class OnSubscribeRedo<T> implements OnSubscribe<T> {
55
56 static final Func1<Observable<? extends Notification<?>>, Observable<?>> REDO_INIFINITE = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
57 @Override
58 public Observable<?> call(Observable<? extends Notification<?>> ts) {
59 return ts.map(new Func1<Notification<?>, Notification<?>>() {
60 @Override
61 public Notification<?> call(Notification<?> terminal) {
62 return Notification.createOnNext(null);
63 }
64 });
65 }
66 };
67
68 public static final class RedoFinite implements Func1<Observable<? extends Notification<?>>, Observable<?>> {
69 private final long count;
70
71 public RedoFinite(long count) {
72 this.count = count;
73 }
74
75 @Override
76 public Observable<?> call(Observable<? extends Notification<?>> ts) {
77 return ts.map(new Func1<Notification<?>, Notification<?>>() {
78
79 int num=0;
80
81 @Override
82 public Notification<?> call(Notification<?> terminalNotification) {
83 if(count == 0) {
84 return terminalNotification;
85 }
86
87 num++;
88 if(num <= count) {
89 return Notification.createOnNext(num);
90 } else {
91 return terminalNotification;
92 }
93 }
94
95 }).dematerialize();
96 }
97 }
98
99 public static final class RetryWithPredicate implements Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>> {
100 private Func2<Integer, Throwable, Boolean> predicate;
101
102 public RetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
103 this.predicate = predicate;
104 }
105
106 @Override
107 public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> ts) {
108 return ts.scan(Notification.createOnNext(0), new Func2<Notification<Integer>, Notification<?>, Notification<Integer>>() {
109 @SuppressWarnings("unchecked")
110 @Override
111 public Notification<Integer> call(Notification<Integer> n, Notification<?> term) {
112 final int value = n.getValue();
113 if (predicate.call(value, term.getThrowable()).booleanValue())
114 return Notification.createOnNext(value + 1);
115 else
116 return (Notification<Integer>) term;
117 }
118 });
119 }
120 }
121
122 public static <T> Observable<T> retry(Observable<T> source) {
123 return retry(source, REDO_INIFINITE);
124 }
125
126 public static <T> Observable<T> retry(Observable<T> source, final long count) {
127 if (count < 0)
128 throw new IllegalArgumentException("count >= 0 expected");
129 if (count == 0)
130 return source;
131 return retry(source, new RedoFinite(count));
132 }
133
134 public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
135 return create(new OnSubscribeRedo<T>(source, notificationHandler, true, false, Schedulers.trampoline()));
136 }
137
138 public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
139 return create(new OnSubscribeRedo<T>(source, notificationHandler, true, false, scheduler));
140 }
141
142 public static <T> Observable<T> repeat(Observable<T> source) {
143 return repeat(source, Schedulers.trampoline());
144 }
145
146 public static <T> Observable<T> repeat(Observable<T> source, Scheduler scheduler) {
147 return repeat(source, REDO_INIFINITE, scheduler);
148 }
149
150 public static <T> Observable<T> repeat(Observable<T> source, final long count) {
151 return repeat(source, count, Schedulers.trampoline());
152 }
153
154 public static <T> Observable<T> repeat(Observable<T> source, final long count, Scheduler scheduler) {
155 if(count == 0) {
156 return Observable.empty();
157 }
158 if (count < 0)
159 throw new IllegalArgumentException("count >= 0 expected");
160 return repeat(source, new RedoFinite(count - 1), scheduler);
161 }
162
163 public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
164 return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, Schedulers.trampoline()));
165 }
166
167 public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
168 return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));
169 }
170
171 public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
172 return create(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
173 }
174
175 private Observable<T> source;
176 private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> controlHandlerFunction;
177 private boolean stopOnComplete;
178 private boolean stopOnError;
179 private final Scheduler scheduler;
180
181 private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,
182 Scheduler scheduler) {
183 this.source = source;
184 this.controlHandlerFunction = f;
185 this.stopOnComplete = stopOnComplete;
186 this.stopOnError = stopOnError;
187 this.scheduler = scheduler;
188 }
189
190 @Override
191 public void call(final Subscriber<? super T> child) {
192 final AtomicBoolean isLocked = new AtomicBoolean(true);
193 final AtomicBoolean resumeBoundary = new AtomicBoolean(true);
194
195 final AtomicLong consumerCapacity = new AtomicLong(0l);
196 final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
197
198 final Scheduler.Worker worker = scheduler.createWorker();
199 child.add(worker);
200
201 final SerialSubscription sourceSubscriptions = new SerialSubscription();
202 child.add(sourceSubscriptions);
203
204 final PublishSubject<Notification<?>> terminals = PublishSubject.create();
205
206 final Action0 subscribeToSource = new Action0() {
207 @Override
208 public void call() {
209 if (child.isUnsubscribed()) {
210 return;
211 }
212
213 Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
214 boolean done;
215 @Override
216 public void onCompleted() {
217 if (!done) {
218 done = true;
219 currentProducer.set(null);
220 unsubscribe();
221 terminals.onNext(Notification.createOnCompleted());
222 }
223 }
224
225 @Override
226 public void onError(Throwable e) {
227 if (!done) {
228 done = true;
229 currentProducer.set(null);
230 unsubscribe();
231 terminals.onNext(Notification.createOnError(e));
232 }
233 }
234
235 @Override
236 public void onNext(T v) {
237 if (!done) {
238 if (consumerCapacity.get() != Long.MAX_VALUE) {
239 consumerCapacity.decrementAndGet();
240 }
241 child.onNext(v);
242 }
243 }
244
245 @Override
246 public void setProducer(Producer producer) {
247 currentProducer.set(producer);
248 long c = consumerCapacity.get();
249 if (c > 0) {
250 producer.request(c);
251 }
252 }
253 };
254
255
256 sourceSubscriptions.set(terminalDelegatingSubscriber);
257 source.unsafeSubscribe(terminalDelegatingSubscriber);
258 }
259 };
260
261
262
263
264 final Observable<?> restarts = controlHandlerFunction.call(
265 terminals.lift(new Operator<Notification<?>, Notification<?>>() {
266 @Override
267 public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {
268 return new Subscriber<Notification<?>>(filteredTerminals) {
269 @Override
270 public void onCompleted() {
271 filteredTerminals.onCompleted();
272 }
273
274 @Override
275 public void onError(Throwable e) {
276 filteredTerminals.onError(e);
277 }
278
279 @Override
280 public void onNext(Notification<?> t) {
281 if (t.isOnCompleted() && stopOnComplete)
282 child.onCompleted();
283 else if (t.isOnError() && stopOnError)
284 child.onError(t.getThrowable());
285 else {
286 isLocked.set(false);
287 filteredTerminals.onNext(t);
288 }
289 }
290
291 @Override
292 public void setProducer(Producer producer) {
293 producer.request(Long.MAX_VALUE);
294 }
295 };
296 }
297 }));
298
299
300 worker.schedule(new Action0() {
301 @Override
302 public void call() {
303 restarts.unsafeSubscribe(new Subscriber<Object>(child) {
304 @Override
305 public void onCompleted() {
306 child.onCompleted();
307 }
308
309 @Override
310 public void onError(Throwable e) {
311 child.onError(e);
312 }
313
314 @Override
315 public void onNext(Object t) {
316 if (!isLocked.get() && !child.isUnsubscribed()) {
317 if (consumerCapacity.get() > 0) {
318 worker.schedule(subscribeToSource);
319 } else {
320 resumeBoundary.compareAndSet(false, true);
321 }
322 }
323 }
324
325 @Override
326 public void setProducer(Producer producer) {
327 producer.request(Long.MAX_VALUE);
328 }
329 });
330 }
331 });
332
333 child.setProducer(new Producer() {
334
335 @Override
336 public void request(final long n) {
337 long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
338 Producer producer = currentProducer.get();
339 if (producer != null) {
340 producer.request(n);
341 } else
342 if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
343 worker.schedule(subscribeToSource);
344 }
345 }
346 });
347
348 }
349 }